---
title: task_runs
sidebarTitle: task_runs
---

# `prefect.server.models.task_runs`



Functions for interacting with task run ORM objects.
Intended for internal use by the Prefect REST API.


## Functions

### `create_task_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/task_runs.py#L51" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
create_task_run(db: PrefectDBInterface, session: AsyncSession, task_run: schemas.core.TaskRun, orchestration_parameters: Optional[Dict[str, Any]] = None) -> orm_models.TaskRun
```


Creates a new task run.

If a task run with the same flow_run_id, task_key, and dynamic_key already exists,
the existing task run will be returned. If the provided task run has a state
attached, it will also be created.

**Args:**
- `session`: a database session
- `task_run`: a task run model

**Returns:**
- orm_models.TaskRun: the newly-created or existing task run


### `update_task_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/task_runs.py#L151" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
update_task_run(db: PrefectDBInterface, session: AsyncSession, task_run_id: UUID, task_run: schemas.actions.TaskRunUpdate) -> bool
```


Updates a task run.

**Args:**
- `session`: a database session
- `task_run_id`: the task run id to update
- `task_run`: a task run model

**Returns:**
- whether or not matching rows were found to update


### `read_task_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/task_runs.py#L180" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
read_task_run(db: PrefectDBInterface, session: AsyncSession, task_run_id: UUID) -> Union[orm_models.TaskRun, None]
```


Read a task run by id.

**Args:**
- `session`: a database session
- `task_run_id`: the task run id

**Returns:**
- orm_models.TaskRun: the task run


### `read_task_run_with_flow_run_name` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/task_runs.py#L199" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
read_task_run_with_flow_run_name(db: PrefectDBInterface, session: AsyncSession, task_run_id: UUID) -> Union[orm_models.TaskRun, None]
```


Read a task run by id.

**Args:**
- `session`: a database session
- `task_run_id`: the task run id

**Returns:**
- orm_models.TaskRun: the task run with the flow run name


### `read_task_runs` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/task_runs.py#L310" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
read_task_runs(db: PrefectDBInterface, session: AsyncSession, flow_filter: Optional[schemas.filters.FlowFilter] = None, flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, offset: Optional[int] = None, limit: Optional[int] = None, sort: schemas.sorting.TaskRunSort = schemas.sorting.TaskRunSort.ID_DESC) -> Sequence[orm_models.TaskRun]
```


Read task runs.

**Args:**
- `session`: a database session
- `flow_filter`: only select task runs whose flows match these filters
- `flow_run_filter`: only select task runs whose flow runs match these filters
- `task_run_filter`: only select task runs that match these filters
- `deployment_filter`: only select task runs whose deployments match these filters
- `offset`: Query offset
- `limit`: Query limit
- `sort`: Query sort

**Returns:**
- List[orm_models.TaskRun]: the task runs


### `count_task_runs` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/task_runs.py#L361" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
count_task_runs(db: PrefectDBInterface, session: AsyncSession, flow_filter: Optional[schemas.filters.FlowFilter] = None, flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, deployment_filter: Optional[schemas.filters.DeploymentFilter] = None) -> int
```


Count task runs.

**Args:**
- `session`: a database session
- `flow_filter`: only count task runs whose flows match these filters
- `flow_run_filter`: only count task runs whose flow runs match these filters
- `task_run_filter`: only count task runs that match these filters
- `deployment_filter`: only count task runs whose deployments match these filters

Returns:
    int: count of task runs


### `count_task_runs_by_state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/task_runs.py#L418" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
count_task_runs_by_state(db: PrefectDBInterface, session: AsyncSession, flow_filter: Optional[schemas.filters.FlowFilter] = None, flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, deployment_filter: Optional[schemas.filters.DeploymentFilter] = None) -> schemas.states.CountByState
```


Count task runs by state.

**Args:**
- `session`: a database session
- `flow_filter`: only count task runs whose flows match these filters
- `flow_run_filter`: only count task runs whose flow runs match these filters
- `task_run_filter`: only count task runs that match these filters
- `deployment_filter`: only count task runs whose deployments match these filters

Returns:
    schemas.states.CountByState: count of task runs by state


### `delete_task_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/task_runs.py#L466" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
delete_task_run(db: PrefectDBInterface, session: AsyncSession, task_run_id: UUID) -> bool
```


Delete a task run by id.

**Args:**
- `session`: a database session
- `task_run_id`: the task run id to delete

**Returns:**
- whether or not the task run was deleted


### `set_task_run_state` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/task_runs.py#L486" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
set_task_run_state(session: AsyncSession, task_run_id: UUID, state: schemas.states.State, force: bool = False, task_policy: Optional[Type[TaskRunOrchestrationPolicy]] = None, orchestration_parameters: Optional[Dict[str, Any]] = None) -> OrchestrationResult
```


Creates a new orchestrated task run state.

Setting a new state on a run is the one of the principal actions that is governed by
Prefect's orchestration logic. Setting a new run state will not guarantee creation,
but instead trigger orchestration rules to govern the proposed `state` input. If
the state is considered valid, it will be written to the database. Otherwise, a
it's possible a different state, or no state, will be created. A `force` flag is
supplied to bypass a subset of orchestration logic.

**Args:**
- `session`: a database session
- `task_run_id`: the task run id
- `state`: a task run state model
- `force`: if False, orchestration rules will be applied that may alter or prevent
the state transition. If True, orchestration rules are not applied.

**Returns:**
- OrchestrationResult object


### `with_system_labels_for_task_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/server/models/task_runs.py#L570" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
with_system_labels_for_task_run(session: AsyncSession, task_run: schemas.core.TaskRun) -> schemas.core.KeyValueLabels
```


Augment user supplied labels with system default labels for a task
run.

